AddIns and Scripting
Roslyn .NET Scripting is an easy way to write custom functionality with C# scripts that are compiled during startup.
You can either write inline scripts in Foopipes.yml or load .csx scripts from a file or an URL.
Addins
Addins are scripts that are loaded, compiled and run at startup. Typically an add-in registers new service types and tasks which then are available for the pipelines.
addins:
- url: "https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/master/Mailgun/mailgun.csx"
- script: |
PipelineTask("nospaces").Json((context, json, ct) =>
{
json.Data["name"] = json.Data["name"].Replace(" ", "");
return json;
});
services:
mailgun:
type: mailgun
apiBaseUrl: https://api.mailgun.net/v3/sandbox5ded26xxxxxxxxxxxxb8.mailgun.org
apiKey: key-3a56bxxxxxxxxxxxxxxx5c
defaultFrom: me@mydomain.com
pipelines:
-
when:
- queue: started
from:
- http: "https://jsonplaceholder.typicode.com/posts"
do:
- nospaces
to:
- log
error:
- { mailgun.send, to: me@mydomain.com, subject: Error, text: An error occured }
Community Addins
Addins created by the community is available as a public repository on Github: https://github.com/AreteraAB/Foopipes.Addins.
When loading a community Addin, use this URL format: https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/master/Mailgun/mailgun.csx.
Consider using a tag or commit hash instead of the latest version: https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/47e1e6d74f2546673b74f929f9ebf74ca56afae5/Tail/Tail.csx.
Pull requests are welcome!
Registering tasks
Register custom tasks by passing a callback to the method PipelineTask(string name)
.
Pipeline tasks can be json and/or binary or dynamic, depending what kind of data they're able to handle.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
// Do something here with json data
return json;
});
PipelineTask("mytask").Binary(async (context, data, cancellationToken) =>
{
// Do something here with binary data
return data;
});
Registering services
Custom services are the best way to keep state. You can either register a service type which later can be referenced in the configuration file or a service instance which will be a named singleton.
using Foopipes.Abstractions.Services;
class MyService : ServiceBase
{
public string MyConfigValue => Config["myConfigValue"];
private int _counter = 0;
public int IncrementCounter()
{
return Interlocked.Increment(ref _counter);
}
}
Service.Register("myserviceType", typeof(MyService));
Create an instance and configure your service in the configuration file:
services:
myserviceInstanceName:
type: myserviceType
myConfigValue: hello
In your tasks, you can get hold of a service instance like this:
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
var service = await context.GetService<MyService>(defaultName: "myserviceInstanceName");
json.Data["counter"] = service.IncrementCounter();
return json;
});
Observer/observable pattern
If your service implements IObservableService
you can emit events that trigger pipelines. Very powerful combined with IRunnableService
and System.Reactive.
#r "System.Reactive"
using System.Reactive.Subjects;
using Foopipes.Abstractions.Services;
class MyObservableService : ServiceBase, IObservableService, IRunnableService
{
private Subject<ServiceEvent> _subject = new Subject<ServiceEvent>();
// IObservableService
public IObservable<ServiceEvent> Observable => _subject;
// IRunnableService
public async Task Run(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(1000, cancellationToken);
var metadata = JObject.FromObject(new
{
currentTime = DateTime.Now,
});
var serviceEvent = new ServiceEvent(this, metadata, new[] { new BinaryData(new byte[]{ 0x42} }));
_subject.OnNext(serviceEvent);
}
}
}
Service.Register("myObservableService", typeof(MyObservableService));
services:
myObservableService:
type: myObservableService
pipelines:
-
when:
- myObservableService
to:
- log
Invoking tasks from a task
You can write tasks that invoke other tasks.
PipelineTask("sendGreeting").Json(async (context, json, cancellationToken) =>
{
var data = JObject.FromObject(new
{
greeting = "hello " + await context.GetExpandedConfigValue("name")
});
var config = new Dictionary<string, string>
{
{"url", "https://www.myservice.com/api" },
{"method", "post"},
{"body", "formUrlEncoded"}
};
var r = await context.RunTask("http").WithData(data).WithArguments(config).Invoke(cancellationToken);
return json;
});
Invoke with:
do:
- { sendGreeting, name: "Foo #{lastname}" }
Returning results
Task callbacks can return json and/or binary data.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
return new JsonData( JObject.FromObject(new { hello="world"}) );
});
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
return new BinaryData(new byte[]{0x42});
});
It's also possible to return multiple results.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
return new ProcessJsonResult( new[]{
JObject.FromObject(new { hello="world1" }),
JObject.FromObject(new { hello="world2" }),
});
});
Referencing other assemblies
You can reference assemblies using the #r syntax.
#r "System.Security.Cryptography.Csp"
using System.Security.Cryptography;
var _aes = Aes.Create();
PipelineTask("decryptstring").Binary(async (context, binary, cancellationToken) =>
{
using (var decryptor = _aes.CreateDecryptor(key, iv))
{
// etc etc
}
return JObject.FromObject(new { value=decryptedData });
});
Currently, it is not possible to reference Nuget assembles.
Data binding
Use context.BindValue(string bindingExpression)
to obtain a value using the data binding functionality.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
var myvalue = await context.BindValue("#{elasticsearch:myvalue}");
json.Data["boundValue"] = myvalue;
return json;
});
Similary, use context.SetValue(string path, object value)
to set a value.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
await context.SetValue("elasticsearch:myvalue", "hello world");
return json;
});
Class Reference
Addin host globals:
{
ITaskBuilder PipelineTask(string name);
IServiceBuilder Service { get; }
}
interface ITaskBuilder
{
/************ Async Json ************/
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<IProcessResult>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JsonData>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JObject>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JObject[]>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<BinaryData>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<byte[]>> func);
/************ Non async Json ************/
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, IProcessResult> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JsonData> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JObject> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JObject[]> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, byte[]> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, BinaryData> func);
/************ async Binary************/
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<IProcessResult>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JsonData>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JObject>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JObject[]>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<BinaryData>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<byte[]>> func);
/************ Non async Binary************/
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, IProcessResult> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JsonData> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JObject> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JObject[]> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, byte[]> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, BinaryData> func);
/************ Non async Dynamic ************/
public ITaskBuilder Dynamic(Func<IScriptTaskContext, dynamic, CancellationToken, dynamic> func);
/************ async Dynamic ************/
public ITaskBuilder Dynamic(Func<IScriptTaskContext, dynamic, CancellationToken, Task<dynamic>> func);
public ITaskBuilder WithDefaultConfigKey(string defaultConfigKey);
}
interface IServiceBuilder
{
public IServiceBuilder Instance(string name, IService instance);
public IServiceBuilder Register(string typeName, Type serviceType);
}
interface IScriptTaskContext
{
IDictionary<string, string> Config { get; }
ILogger Logger { get; }
ILoggerFactory LoggerFactory { get; }
IPipelineContext PipelineContext { get; }
IServiceProvider ServiceProvider { get; }
Task<string> BindValue(string bindingExpression);
Task<string> GetExpandedConfigValue(string key, bool throwIfNotSet = true);
IService GetService(string name);
IRunTaskBuilder RunTask(string name);
Task SetValue(string path, object val);
}
public static class ScriptTaskContextExtensions
{
public static async Task<TService> GetService<TService>(this IScriptTaskContext context,
string defaultName,
string configKeyName = "service",
bool throwIfNotFound = true) where TService : class;
public static T GetAndConvertConfigValue<T>(this IScriptTaskContext context, string key, T defaultValue=default(T));
public static string GetConfigValue(this IScriptTaskContext context, string key, bool throwIfNotSet=true);
}
public class JsonData : IProcessResultData
{
public JsonData(JObject jsonData, JObject metadata=null);
public JObject Metadata { get; }
public JObject Data { get; }
public static JsonData Empty { get; }
}
public class BinaryData : IProcessResultData
{
public BinaryData(byte[] binaryData, JObject metadata = null);
public JObject Metadata { get; }
public byte[] Data { get; }
public static BinaryData Empty { get; }
}